python处理字节流形式的视频

您所在的位置:网站首页 Python 处理视频 python处理字节流形式的视频

python处理字节流形式的视频

2024-07-10 23:08| 来源: 网络整理| 查看: 265

python处理内存中字节流形式的视频

在使用python的streamlit库处理上传的文件时碰到一个问题,文件上传后是以字节数组的形式存在内存中,我在后续需要使用cv2库逐帧操作上传的视频,这里就产生一个问题,cv2怎么读取到内存中字节形式的视频? 经过多次查找,发现可以使用ffmpeg从内存中将字节流解码,然后再由cv2处理。

代码如下:

import streamlit as st # streamlit中上传文件的函数 @st.cache(allow_output_mutation=True,show_spinner=False) def load_local_video(uploaded_video): bytes_data = uploaded_video.getvalue() print(uploaded_video.name) return bytes_data uploaded_video = st.sidebar.file_uploader(" ") video = load_local_video(uploaded_video)

处理逻辑就是使用ffprobe获取字节流中内存相关信息,然后使用ffmpeg解码。随后从字节流中逐帧获取视频。

import numpy as np import cv2 import io import subprocess as sp import threading import json from functools import partial import shlex # Write to stdin in chunks of 1024 bytes. def writer(stream,process): for chunk in iter(partial(stream.read, 1024),b''): process.stdin.write(chunk) try: process.stdin.close() except (BrokenPipeError): pass # For unknown reason there is a Broken Pipe Error when executing FFprobe. def do(video): bytes_stream = io.BytesIO(video) #执行子进程并定到输入输出的接口,这里使用ffprobe检测视频的信息 # 注意,这里的路径是我本地的路径,运行代码的时候需要更改路径,并且确保你的电脑装上了ffmpeg process = sp.Popen(shlex.split('E:/ffmpeg-4.3.1-2020-11-19-essentials_build/bin/ffprobe -v error -i pipe: -select_streams v -print_format json -show_streams'), stdin=sp.PIPE, stdout=sp.PIPE, bufsize=10**8) pthread = threading.Thread(target=writer,args=(bytes_stream,process)) pthread.start() pthread.join() #从标准输入输出流中获取视频信息 in_bytes = process.stdout.read() process.wait() p = json.loads(in_bytes) #得到视频的分辨率 width = (p['streams'][0])['width'] height = (p['streams'][0])['height'] bytes_stream.seek(0) process = sp.Popen(shlex.split('E:/ffmpeg-4.3.1-2020-11-19-essentials_build/bin/ffmpeg -i pipe: -f rawvideo -pix_fmt bgr24 -an -sn pipe:'), stdin=sp.PIPE, stdout=sp.PIPE, bufsize=10**8) thread = threading.Thread(target=writer,args=(bytes_stream,process)) thread.start() #将视频解码完毕 all_bytes = process.stdout.read()#将视频字节从内存中读取出来 counter = len(all_bytes)/(width * height * 3) #print(str(counter)+' 帧') l = 0 my_bar = st.progress(0) with st.spinner(text="视频检测中"): while l


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3